dubbo集群容错和负载均衡

您所在的位置:网站首页 容错 负载均衡 dubbo集群容错和负载均衡

dubbo集群容错和负载均衡

2023-11-21 08:50| 来源: 网络整理| 查看: 265

参考资料:

敖丙-dubbo集群容错负载均衡

在dubbo服务订阅的文章中,我们曾经给出过这样一张图,实际上消费者只拿到一个Invoker,这个Invoker经过多次封装,具备集群容错、路由、负载均衡的能力。这篇文章,我们深入了解dubbo的集群容错、路由过滤和负载均衡。

image-20201223164509212

服务目录

Directory,服务目录,之前我们提过,它有一个属性urlInvokerMap,存储多个提供者的URL。除此以外,服务目录还需要监听注册中心。因为服务提供者并不是保持不变的,有时候可能出现机器宕机,或者新的提供者上线,提供者数量和信息是动态变化的。

下面是Directory的类图,主要看下RegistryDirectory,主要有三个作用,分别是获取invoker列表,监听注册中心,以及更新inivoker。

image-20210101155247031

获取invoker列表

在Directory接口中,只定义两个方法,一个是返回服务接口的class对象,一个是返回invoker列表。

public interface Directory extends Node { /** * get service type. * * @return service type. */ Class getInterface(); /** * list invokers. * * @return invokers */ List list(Invocation invocation) throws RpcException; }

RegistryDirectory是根据方法名从缓存(localMethodInvokerMap)找到对应的invoker集合,源码如下。

com.alibaba.dubbo.registry.integration.RegistryDirectory#doList

@Override public List doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException("xxxxx"); } List invokers = null; Map localMethodInvokerMap = this.methodInvokerMap; // local reference if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) { // invocation 应该是RpcInvocation,封装方法名、方法参数 String methodName = RpcUtils.getMethodName(invocation); Object[] args = RpcUtils.getArguments(invocation); // 下面一大段就是表达,通过方法名,获取到对应的invoker列表 if (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) { // The routing can be enumerated according to the first parameter // 根据方法名和第一个参数 invokers = localMethodInvokerMap.get(methodName + "." + args[0]); } if (invokers == null) { invokers = localMethodInvokerMap.get(methodName); } if (invokers == null) { invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); } if (invokers == null) { Iterator iterator = localMethodInvokerMap.values().iterator(); if (iterator.hasNext()) { invokers = iterator.next(); } } } return invokers == null ? new ArrayList(0) : invokers; } 监听注册中心

在服务订阅的文章中,我们提到消费者会去监听configurators、routers和providers节点。过程就是,获取所有节点的url信息,根据category的参数值分门别类,接着把URL转成对应的Configurator和Router对象,最后刷新提供者的invoker列表。

Image

com.alibaba.dubbo.registry.integration.RegistryDirectory#notify

@Override public synchronized void notify(List urls) { // 用来保存configurators、routers、providers节点的url信息 List invokerUrls = new ArrayList(); List routerUrls = new ArrayList(); List configuratorUrls = new ArrayList(); for (URL url : urls) { // 根据协议和类别,把url添加到对应的集合 String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("xxxx"); } } // configurators if (configuratorUrls != null && !configuratorUrls.isEmpty()) { // 把URL转成Configurator对象 this.configurators = toConfigurators(configuratorUrls); } // routers if (routerUrls != null && !routerUrls.isEmpty()) { // 把URL转成Router对象 List routers = toRouters(routerUrls); if (routers != null) { // null - do nothing setRouters(routers); } } List localConfigurators = this.configurators; // local reference // merge override parameters this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && !localConfigurators.isEmpty()) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers // 刷新provider refreshInvoker(invokerUrls); } 刷新invoker列表

上面notify方法先去处理和保存configurators和routers节点信息,最后刷新provider信息,我们看下代码:

com.alibaba.dubbo.registry.integration.RegistryDirectory#refreshInvoker

private void refreshInvoker(List invokerUrls) { if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // Forbid to access this.methodInvokerMap = null; // Set the method invoker map to null destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access Map oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) { invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet(); //Cached invoker urls, convenient for comparison this.cachedInvokerUrls.addAll(invokerUrls); } if (invokerUrls.isEmpty()) { return; } // Translate url list to Invoker map // 把URL对象转成Invoker Map newUrlInvokerMap = toInvokers(invokerUrls); // Change method name to map Invoker Map // 根据方法名分类 Map newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // state change // If the calculation is wrong, it is not processed. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { logger.error("xxxxxxx"); return; } this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try { destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker } catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }

刷新列表的过程是,先把集合里面的URL对象转成Invoker,得到一个**的Map。接着,根据方法名归类,得到一个**的映射关系。最后,将同一个组的Invoker合并,得到methodInvokerMap,也就是doList方法中通过方法名返回对应invoker List的缓存。

服务路由

dubbo提供三种路由规则,分别是条件路由、脚本路由和标签路由,具体路由规则可参考官网资料

最常用的是条件路由,格式是 消费者匹配条件 => 提供者地址列表过滤条件,比如host = 10.20.153.10 => host = 10.20.153.11 表示ip为10.20.153.10的消费者只能调用ip是10.20.153.11的提供者。

路由的配置是通过RegistryDirectory#notify更新信息

image-20210101213959853

路由的调用是在刷新invoker时,获取method与invoker列表映射的方法中会进行服务级别和方法级别的路由。

com.alibaba.dubbo.registry.integration.RegistryDirectory#toMethodInvokers

image-20210101214602547

集群

经过路由过滤后,可能还有多个满足条件的invoker,dubbo会把这些invoker封装成ClusterInvoker,最终给到消费者调用的只有一个invoker。

clusterInvoker内部封装各种操作,比如快速失败、失败切换等。Dubbo默认的cluster实现有很多,主要有以下几种:

image-20210103210111813

@SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive Invoker join(Directory directory) throws RpcException; } FailoverClusterInvoker

这个cluster实现的是失败自动切换功能,就是当调用失败后,会切换另外一个提供者。

具体过程是,先获取重试次数,根据重试次数循环调用。在循环体内,先通过负载均衡选择一个Invoker,添加到集合中,用来记录哪些invoker被调用过,然后使用这个invoker发起远程调用,如果失败,则重试。

@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List invokers, LoadBalance loadbalance) throws RpcException { List copyinvokers = invokers; checkInvokers(copyinvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); // 获取重试次数,Constants.DEFAULT_RETRIES=2,所以默认应该是3次 int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } // 通过负载均衡选择一个invoker Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 发起调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { // ... 打印日志 } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException("...异常信息"); } FailfastClusterInvoker

这个cluster只会远程调用一次,没有失败重试。

@Override public Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. throw (RpcException) e; } throw new RpcException("xxxx"); } } FailbackClusterInvoker

这个cluster会在调用失败后,会先记录本次调用,然后返回一个空结果,并且通过定时任务对失败的调用重试。

@Override protected Result doInvoke(Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("xxxx"); // 记录失败的调用 addFailed(invocation, this); return new RpcResult(); // ignore } } private void addFailed(Invocation invocation, AbstractClusterInvoker router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { // 开启定时任务,默认每隔5s执行一次 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // collect retry statistics try { // 失败重试 retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } // 把调用失败的添加到Map failed.put(invocation, router); } void retryFailed() { if (failed.size() == 0) { return; } for (Map.Entry>(failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker invoker = entry.getValue(); try { invoker.invoke(invocation); // 调用成功后移除 failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } } 负载均衡

dubbo主要提供了以下四种负载均衡算法:

image-20210104153643365

AbstractLoadBalance

这是负载均衡的父类,使用模板方法的设计模式。子类根据自己特点实现doSelect方法

@Override public Invoker select(List invokers, URL url, Invocation invocation) { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); // 由子类实现 return doSelect(invokers, url, invocation); } protected abstract Invoker doSelect(List invokers, URL url, Invocation invocation);

另外,AbstractLoadBalance还提供计算权重的公共方法,是为了服务预热,如果服务提供者启动没多久,小于预热时间,需要先降低其权重。

/** * 计算权重 * * @param invoker * @param invocation * @return */ protected int getWeight(Invoker invoker, Invocation invocation) { // 获取weight参数值 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { // 得到启动时间 long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { // 计算启动多久了 int uptime = (int) (System.currentTimeMillis() - timestamp); // 获取warmup参数值 int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { // 启动时间小于预热时间,则降低权重 weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); } RandomLoadBalance

这个算法是加权随机,dubbo默认的负载均衡算法,算法思想大概是:假设有两台服务器A和B,要求70%的请求落到A上,30%请求落到B上。只要生成一个随机数,范围是[0,10),如果随机数在[0,7),则选择服务器A;如果是[7,10)则选择服务器B。

下面是加权随机的源码:com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance#doSelect

@Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? // 判断是否等权重 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), // select randomly based on totalWeight. // 生成一个随机数 int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { // 随机数与权重相减 offset -= getWeight(invokers.get(i), invocation); // 随机数小于权重,说明落到指定区间 if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); } LeastActiveLoadBalance

这个是最少活跃数负载均衡,算法思想是,先遍历所有的提供者,获取每个提供者的活跃数,记录最小活跃数和对应的提供者。如果活跃数等于最小活跃数的提供者数量大于1个,则通过加权随机算法选择一个提供者。

@Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { // Number of invokers int length = invokers.size(); // The least active value of all invokers // 最少活跃数 int leastActive = -1; // The number of invokers having the same least active value (leastActive) // 有多少个活跃数等于最小活跃数的提供者数量 int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexs = new int[length]; // The sum of with warmup weights int totalWeight = 0; // Initial value, used for comparision int firstWeight = 0; // Every invoker has the same weight value? boolean sameWeight = true; // 循环遍历每个提供者 for (int i = 0; i < length; i++) { Invoker invoker = invokers.get(i); // Active number, 获取当前提供者活跃数 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); int afterWarmup = getWeight(invoker, invocation); // Weight // Restart, when find a invoker having smaller least active value. if (leastActive == -1 || active < leastActive) { leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = afterWarmup; // Reset firstWeight = afterWarmup; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value? // If current invoker's active value equals with leaseActive, then accumulating. } else if (active == leastActive) { leastIndexs[leastCount++] = i; // Record index number of this invoker totalWeight += afterWarmup; // Add this invoker's weight to totalWeight. // If every invoker has the same weight? if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } // 如果多个提供者的活跃数等于最小活跃数,通过加权随机算法选择其中一个 if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), // select randomly based on totalWeight. int offsetWeight = random.nextInt(totalWeight) + 1; // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight 接口名.方法名 private final ConcurrentMap>(); @SuppressWarnings("unchecked") @Override protected Invoker doSelect(List invokers, URL url, Invocation invocation) { String methodName = RpcUtils.getMethodName(invocation); // key -> 接口名.方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName; // 用于验证对象变化 int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector selector = (ConsistentHashSelector) selectors.get(key); if (selector == null || selector.identityHashCode != identityHashCode) { // 创建选择器,并缓存 selectors.put(key, new ConsistentHashSelector(invokers, methodName, identityHashCode)); selector = (ConsistentHashSelector) selectors.get(key); } // 调用选择器的select方法 return selector.select(invocation); } private static final class ConsistentHashSelector { // hash环中,保存 值与invoker的映射关系 private final TreeMap virtualInvokers; // 每个invoker的虚拟节点数 private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List invokers, String methodName, int identityHashCode) { // 基于红黑树实现的有序map this.virtualInvokers = new TreeMap(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); // 获取虚拟节点数,默认是160 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); // 获取需要hash的参数下标,默认获取第1个参数 // 配置例子 String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); // 暂时发现argumentIndex的作用是toKey方法中用到,即根据参数决定在圆环上的落点 argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } // 设置虚拟节点与invoker的映射关系 for (Invoker invoker : invokers) { // 获取提供者的ip:port String address = invoker.getUrl().getAddress(); // 以160个虚拟节点为例,生成40份加密,每份经过4次hash,即还是每个invoker有160个虚拟节点 for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } } public Invoker select(Invocation invocation) { // 根据参数生成key String key = toKey(invocation.getArguments()); byte[] digest = md5(key); // md5加密和hash key,得到对应invoker return selectForKey(hash(digest, 0)); } private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } private Invoker selectForKey(long hash) { // ailMap(hash, true):获取大于hash值的所有key,true表示包括等于。 // .firstEntry() -> 获取第一个 Map.Entry entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { // 如果为null,说明取第一个 entry = virtualInvokers.firstEntry(); } return entry.getValue(); } private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF)


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3